Skip to content

feat: add JVM UDF framework for native execution#4232

Open
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:jvm-udf-framework
Open

feat: add JVM UDF framework for native execution#4232
andygrove wants to merge 2 commits intoapache:mainfrom
andygrove:jvm-udf-framework

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 5, 2026

Which issue does this PR close?

Part of #4193

Rationale for this change

This PR adds the core JVM UDF framework that enables Comet to invoke JVM-side UDF implementations operating on Arrow data via JNI. This allows us to quickly implement expressions with 100% Spark compatibility without re-implementing them in native Rust code — we call existing Java/Spark code, but operate on Arrow data, avoiding an expensive transition falling back to Spark.

What changes are included in this PR?

The framework consists of:

JVM side:

  • CometUDF trait — interface that JVM UDF implementations must satisfy
  • CometUdfBridge — JNI entry point that native execution calls to invoke a UDF; handles class instantiation caching, Arrow FFI import/export, and result validation
  • CometLambdaRegistry — thread-safe registry bridging plan-time Spark expressions to execution-time UDF lookup

Native (Rust) side:

  • JvmScalarUdfExpr — DataFusion PhysicalExpr that delegates evaluation to a JVM-side CometUDF via JNI and the Arrow C Data Interface
  • CometUdfBridge JNI handle in jni-bridge — caches class/method references
  • JvmScalarUdf protobuf message — serde format for transmitting UDF invocations from plan to execution

Planner integration:

  • ExprStruct::JvmScalarUdf handling in the native planner

This is the framework only — individual expression implementations (e.g., array_exists) will be added in follow-up PRs.

How are these changes tested?

  • Rust compilation verified (cargo check passes for all affected crates)
  • End-to-end testing will come with the first expression implementation in a follow-up PR

Add a framework that allows Comet to invoke JVM-side UDF implementations
operating on Arrow data via JNI, avoiding expensive fallback to Spark while
maintaining 100% Spark compatibility for expressions not yet implemented
natively in Rust.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw @andygrove can we use this framework for regexp udfs?

@andygrove
Copy link
Copy Markdown
Member Author

Btw @andygrove can we use this framework for regexp udfs?

Yes, there is example in #4170

It is perfect for regexp because we get 100% compatibility with almost no effoert, enabled by default

@comphead
Copy link
Copy Markdown
Contributor

comphead commented May 5, 2026

I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising

@andygrove
Copy link
Copy Markdown
Member Author

I'm also wondering can we use this framework for user udfs 🤔 currently this is a huge drawback in Comet that for user defined function we fallback as there is no way to transpile custom user code to native side, can this framework be offered to the user as an alternative. depending on UDF complexity it may or may not be easy to rewrite custom user code from Spark UDF to Comet Java UDF. For example I anticipate some problems if the user works on the row level, i.e update some specific values in the row and in Arrow Java it might be more complicated but still promising

I am already working on enable this in #4233

* time the serde layer registers a lambda expression under a unique key; at execution time the
* UDF retrieves it by that key (passed as a scalar argument).
*/
object CometLambdaRegistry {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is CometLambdaRegistry used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// explicit per-task isolation.
private static final int CACHE_CAPACITY = 64;

private static final ThreadLocal<LinkedHashMap<String, CometUDF>> INSTANCES =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we ensure one instance per thread? Spark/Hive UDFs don't seem to do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is a good point, will take another look

@mbutrovich
Copy link
Copy Markdown
Contributor

This PR got me thinking about whether the per-expression CometUDF pattern could be generalized, and I prototyped a generic dispatcher on top of this PR's framework. Branch: https://github.com/mbutrovich/datafusion-comet/tree/jvm-udf-generic-dispatcher. The core file is CometGenericExpressionUDF.scala.

What it does

CometGenericExpressionUDF is one CometUDF class that evaluates an arbitrary Spark Expression tree. At plan time, the serde registers the bound expression in CometLambdaRegistry (UUID-keyed), emits a JvmScalarUdf proto pointing at the generic class, and passes data attributes as args. At execution time the UDF looks up the expression, compiles it once via GenerateMutableProjection, and loops over the input Arrow vectors using a reused SpecificInternalRow.

Benefits

One JVM-side class handles any scalar Spark Expression, with no per-expression hand-coded evaluator required.

The dispatcher evaluates composed expression trees in one JNI hop. If a child node (e.g. upper(col) inside rlike(upper(col), pattern)) isn't supported natively, the whole tree still evaluates together without forcing whole-plan fallback.

Benchmarks competitively with the hand-coded RegExpLikeUDF from #4239. Spark's MutableProjection codegen produces the same hot-loop shape (bytes to UTF8String to eval to result) that a hand-written loop does, so there is no inherent per-row dispatcher overhead.

One Janino compile per expression tree, cached by registry key.

Limitations

Near-term, fixable with incremental work

  • Types are prototype-narrow. Input is VarCharVector only, output is BitVector only. Widening is mechanical: build an Array[ColumnReader] and a ResultWriter at cache-miss time, dispatching on Arrow type and expression.dataType once per expression. Scaladoc in the prototype file sketches the shape.
  • CometLambdaRegistry is JVM-local. Driver and executor sharing a JVM works for local Spark only. Cluster mode requires serializing the bound Expression into the proto (Java serialization or Kryo) and dropping the UUID key.
  • Only CometRLike is wired for the generic path in this prototype. The serde logic is not RLike-specific and can be extracted to a single helper that any CometExpressionSerde opts into with one line.
  • Nondeterministic expressions (rand, monotonically_increasing_id) need an initialize(partitionIndex) call before the first row. Easy to add at cache-miss time.
  • Registry entries are never removed, which leaks for long-running drivers.
  • VarCharVector.get(i) copies bytes into a fresh byte[] per row. Matches RegExpLikeUDF, so the A/B comparison is fair, but both paths would improve with a reusable NullableVarCharHolder or UTF8String.fromAddress.

Longer-term, may never fully reach

  • Aggregates, window functions, and generators do not fit the CometUdfBridge "one result vector per input, same length" contract. Each needs its own bridge signature.
  • Python and Pandas UDFs are reachable in principle (they are Expression subclasses). Whether the per-row socket IPC to the Python worker is cheaper than whole-plan fallback would need to be measured.
  • Performance parity with native Rust on expressions that emit per-row allocations (decimals, arrays, strings out) is unlikely. JVM boxing through UnsafeRow and ArrayData is inherent to the evaluation shape, whereas native Rust writes directly into Arrow buffers.
  • Cross-Spark-version stability of Expression serialization is fragile. Spark internals change between releases, and a cluster-mode implementation would need a compatibility story.

Benchmark numbers

Per-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. Source: CometRegExpBenchmark with one extra case added for the generic dispatcher.

Pattern Spark Comet (Scan) Comet (Exec, native Rust) Comet (Exec, JVM hand-coded) Comet (Exec, JVM generic)
character_class [0-9]+ 12561.0 10616.9 4764.3 4377.9 4293.4
anchored ^[0-9] 9077.1 8776.9 3463.7 3487.0 3384.8
alternation abc|def|ghi 12189.4 11970.7 6837.2 6497.1 6785.3
multi_class [a-zA-Z][0-9]+ 9394.9 10048.6 4272.1 4193.9 4343.2
repetition (ab){2,} 9160.1 9146.7 4086.7 4075.5 4125.5

The generic path tracks the hand-coded path within a few percent across all five patterns. Native Rust is competitive but not dominant on these patterns, likely because the workload favors JIT-warmed backtracking over DFA construction. On adversarial patterns or non-regex expressions with tight Rust kernels, native would be expected to pull further ahead.

I think this is a super promising direction to more quickly (and provide 100% compatibility) support UDFs! Thanks @andygrove!

@mbutrovich
Copy link
Copy Markdown
Contributor

mbutrovich commented May 7, 2026

Follow-up to my earlier MutableProjection dispatcher comment. After more experimentation, the variant now has a default path that covers any scalar Spark Expression, a per-expression specialization hook for the subset where Spark's doGenCode imposes a round-trip cost, and a small set of universal boundary optimizations applied mechanically to every compiled kernel. Benchmark results are at parity with the hand-coded UDFs across every pattern measured, and the architectural value is concentration of the optimization surface rather than an inherent perf win. Branch: https://github.com/mbutrovich/datafusion-comet/tree/jvm-udf-generic-dispatcher. Core files: CometBatchKernelCodegen.scala, CometCodegenDispatchUDF.scala, ArrowBackedRow.scala.

What it does

Same plan-time wiring as the MutableProjection variant (bind the Spark Expression, register it in CometLambdaRegistry, emit a JvmScalarUdf proto keyed by registry UUID). At execute time, CometBatchKernelCodegen.compile emits a specialized CometBatchKernel:

void process(VarCharVector[] inputs, FieldVector outRaw, int numRows) {
  ConcreteOutVector output = (ConcreteOutVector) outRaw;
  ArrowBackedRow row = new ArrowBackedRow(inputs);
  for (int i = 0; i < numRows; i++) {
    row.setRowIdx(i);
    <inlined expr.genCode(ctx) output>
    if (<result.isNull>) output.setNull(i);
    else <write snippet chosen by dataType>;
  }
}

ArrowBackedRow extends InternalRow via a CometInternalRow shim. BoundReference.genCode resolves row.getUTF8String(ord) to a direct Arrow read at the current row index. One Janino compile per expression tree, cached by registry key.

Default path vs specialized path

Default path reuses Spark's doGenCode unchanged. Works for every scalar Expression whose generated code stays in UTF8String space end to end. Rlike, numeric-output expressions, short string transforms all land here.

Specialized path bypasses doGenCode via a match case in CometBatchKernelCodegen.compile. Each specialization is roughly 15 lines emitting Java that mirrors the hand-coded implementation for that expression. Needed because Spark's doGenCode sometimes internally round-trips through Java String. For example, java.util.regex.Matcher requires a CharSequence, so RegExpReplace.doGenCode decodes UTF8String -> String, runs the matcher, then encodes String -> UTF8String on the return. That cost is inherent to Spark's internal row shape and cannot be eliminated by optimizing the Arrow boundary alone. The specialization emits the hand-coded shape directly so no UTF8String appears in the per-row loop.

Currently one specialization lives in the tree: RegExpReplace. The mechanism is general.

Universal boundary optimizations

Applied to every compiled kernel in the default path, no per-expression knowledge required:

  • Zero-copy UTF8String reads. ArrowBackedRow.getUTF8String wraps the Arrow data buffer's native address directly via UTF8String.fromAddress. Skips the byte[] allocation that VarCharVector.get(i) would otherwise pay.
  • Pre-sized variable-length output buffers. For StringType output, the caller passes an input-data-buffer-sized byte estimate to allocateOutput, which uses the two-arg allocateNew(bytes, rows). Reduces mid-loop setSafe reallocations.
  • NullIntolerant short-circuit. For expressions implementing Spark's NullIntolerant marker trait (null in any input results in null output), the emitter prepends a pre-check on input nullity that skips expression evaluation entirely for null rows, not just the output write.

Three approaches side by side

Aspect Hand-coded CometUDF (this PR) Generic MutableProjection Arrow-direct codegen
Classes per expression (default path) One Zero Zero
Classes per expression (specialized) n/a n/a ~15-line match case, one file
Per-row batch loop Hand-written Scala Interpreted Scala Compiled Java
Arrow read and write Hand-written Interpreted Scala Compiled Java
Expression evaluation Hand-written Compiled (projection codegen) Compiled (doGenCode), inlined into the fused loop
Composed expression trees No, without native support for children Yes Yes
Adding a new scalar expression New UDF class + serde branch Free in the supported type set Free in the supported type set
Current input types Per UDF StringType StringType
Current output types Per UDF BooleanType BooleanType + StringType
Widening inputs Per UDF Add ColumnReader cases Add getters to ArrowBackedRow
Widening outputs Per UDF Dispatch in evaluate Dispatch in codegen template
Cluster mode today Works Local only Local only
Aggregates, windows, generators Requires bridge change Same Same

Benchmark numbers

Per-row nanoseconds, lower is better. Apple M3 Max, OpenJDK 11.0.30+7-LTS, macOS 26.4.1. From CometRegExpBenchmark.

rlike patterns (6 JVM variants including MutableProjection):

Pattern Spark Comet (Scan) Native Rust JVM hand-coded JVM MutableProjection JVM codegen
character_class [0-9]+ 14490 12551 6382 5042 5124 5173
anchored ^[0-9] 10558 10383 3988 3985 4074 3788
alternation abc|def|ghi 13977 14982 8164 8096 8242 7868
multi_class [a-zA-Z][0-9]+ 10545 10979 4829 4833 5338 5236
repetition (ab){2,} 10848 10026 4659 4807 4598 4506

regexp_replace (5 variants; MutableProjection dispatcher omitted because its current output writer is BitVector-only):

Pattern Spark Comet (Scan) Native Rust JVM hand-coded JVM codegen (specialized)
replace_no_match xyzzy 9896 9089 3524 3233 3545
replace_small_match \d+ -> N 10575 10273 4294 4090 4370
replace_wide_match [a-zA-Z0-9] -> * 20564 20292 10618 10499 10508

Reading:

  • rlike: codegen dispatch is within run-to-run noise of hand-coded on every pattern. Some cells are faster, some slower, no pattern to the direction.
  • regexp_replace without specialization: an earlier measurement showed codegen at 12794 vs hand-coded 8880 on replace_wide_match, a 44% gap, driven by the UTF8String round-trip inside Spark's RegExpReplace.doGenCode.
  • regexp_replace with specialization: 10508 vs 10499, gap closed. The ~2-6% deltas on replace_no_match and replace_small_match are within the variance we see between runs (hand-coded replace_no_match alone has shifted from 3070 to 3596 to 3233 across three runs).

The honest architectural read

The benchmark claim is "codegen dispatch matches hand-coded." The larger structural claim is where the maintenance math lives.

Each of the universal boundary optimizations above (zero-copy reads, pre-sized output, NullIntolerant short-circuit) could be retrofitted into every hand-coded CometUDF. Doing that would keep hand-coded ahead or at parity on these workloads. The cost is that each optimization becomes N independent code changes across N UDFs, with every future UDF inheriting the obligation to apply them.

The dispatcher applies the same optimizations in one place, propagating to every expression that uses it. That is the architectural argument, not the raw perf numbers. Perf parity says using the dispatcher is free; optimization concentration says the dispatcher is cheaper to improve and maintain going forward.

Limitations

Carried over from the MutableProjection variant:

  • CometLambdaRegistry is JVM-local. Cluster mode needs serialized-expression bytes in the proto.
  • Scalar shape only. Aggregates, windows, generators need a different bridge.
  • Nondeterministic.initialize(partitionIndex) is not wired.
  • Registry entries are never removed.

Specific to the codegen variant:

  • ArrowBackedRow implements isNullAt and getUTF8String. Other getters inherit the CometInternalRow shim's throwing defaults until widened.
  • The codegen template's output write dispatches on BooleanType and StringType. New output types are additive cases.
  • Dictionary-encoded Arrow inputs are not handled.
  • Specializations are harder to debug than hand-coded UDFs. Generated Java has no source file, so a bug surfaces as a Janino compile error or a stack trace through a synthetic class. Cognitive load per specialization is comparable to a hand-coded UDF even though the line count is smaller.

Widening is local: one getter in ArrowBackedRow per new input Arrow type, one case in the codegen template per new output Spark type. Expression evaluation comes from Spark's doGenCode, so composition is free at the default-path level.

On #4239

The follow-up PR #4239 adds hand-coded UDFs covering the remaining regex expressions on this framework. LLM-assisted authoring makes that work cheap at write time, but every class is code to review, test against Spark semantics across versions, and maintain as Spark evolves. The generic dispatcher covers the same surface with the default path, and the few expressions whose doGenCode imposes a round-trip cost can be specialized in one file without adding per-expression classes. Worth weighing the two directions before merging, since committing to the hand-coded catalogue makes a later switch to the dispatcher path harder to justify.

Bottom line

Codegen dispatch matches hand-coded on every pattern benchmarked, covers any scalar Expression with zero per-expression code on the default path, and supports specialization in one file for the small set of expressions that need it. Universal boundary optimizations live in one place rather than per-UDF, which is the architectural reason to prefer this direction even when the raw per-row numbers are tied. Thanks again @andygrove, this has been a very fun direction to experiment with.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants